From version 9.4.2, new classes MPReplayBuffer
and MPPrioritizedReplayBuffer
support multiprocess learning like Ape-X (single learner with multiple explorers) on single machine efficiently.
First of all, MPReplayBuffer
and MPPrioritizedReplayBuffer
(Multi Process ReplayBuffer) maps internal data on shared memory. This means you don’t need to use proxy (e.g. multiprocessing.managers.SyncManager
) or queue (e.g. multiprocessing.Queue
) for interproecss data sharing, but you can simply access the buffer object from different process.
from multiprocessing import Process
from cpprb import MPPrioritizedReplayBuffer
rb = MPPrioritizedReplayBuffer(100,{"obs": {},"done" {}})
def explorer(rb):
for _ in range(100):
# Something ...
rb.add(obs=obs, done=done)
p = Process(target=explorer,args=[rb]) # You can pass to Process simply as argument
p.start()
p.join()
sample = p.sample(10) # You can access data stored at different process.
Although you can implement Ape-X with ordinary ReplayBuffer
or PrioritizedReplayBuffer
class, locking entire buffer when writing and reading is quite inefficient.
# Part of Explorer Naiive Implementation
if local_buffer.get_stored_size() > local_size:
local_sample = local_buffer.get_all_transitions()
local_buffer.clear()
with lock: # Inefficient: Lock entire buffer during addition
global_buffer.add(**local_sample)
MPReplayBuffer
and MPPrioritizedReplayBuffer
automatically lock only critical section instead of entire buffer. For example, since sequential add
method calls should write different memory address, its critical section is only index fetching and increment. This locking reduction allows multiple explorers to add transitions parallelly.1
We adopt exclusive-read concurrent-write model for access control. We allow multiple writing parallelly and atomically trace the number of writers in the critical section. Reading has higher priority and prevents writers (aka. actors) from entering the critical section again. When all writers exit the critical section, reader (aka. learner) starts working in the critical section.
We restrict the number of learner to 1. If we allow multiple learners, which have higher priorities, it is possible that actors will never enter the critical section, which is not desired for reinforcement learning.
MPReplayBuffer
and MPPrioritizedReplayBuffer
don’t support features of Nstep Experience Replay, Memory Compression, and Map Data on File. (You can still utilize these features at local buffers of explorers.)
MPReplayBuffer
and MPPrioritizedReplayBuffer
assume single learner (sample
/ update_priorities
) and multiple explorers (add
). You must not call learner functions from multiple processes simultaneously.
From version 10.6, MPReplayBuffer
and MPPrioritizedReplayBuffer
accept two new keyword arguments at their constructors.
Name | Default | Type | Description |
---|---|---|---|
ctx |
None |
Optional[Union[ForkContext,SpawnContext,SyncManager]] |
Context to be used for Event , Lock etc. If None (default), default context is used. The context passed must match with the context of Process . When SyncManager is passed, Event and Lock are accessed through proxy. |
backend |
"sharedctypes" |
"sharedctypes" or "SharedMemory" (only for Python 3.8+) |
Backend for shared memory. |
Context specifies how cpprb manages shared data and synchronization.
import multiprocessing as mp
default_context = mp.get_context()
fork_context = mp.get_context("fork") # only at Linux and macOS
spawn_context = mp.get_context("spawn")
On Linux, fork
is the default context, where the parent process itself is reused with copy-on-write strategy, so that usually subprocess starts faster, but it might be problematic if some library like TensorFlow has already started background thread.
On macOS and Linux, spawn
is the default context, where only necessary objects are copied to a fresh new process. The disadvantage is not only slow start but also requiring users to define classes and functions at top-level of module (See pickle).
Precisely speaking, SyncManager
is not a start method like fork
and spawn
. Actually, it starts new process with one of these methods, then provides proxy objects to access the original objects placed at the process. Generally, SyncManager
is slower than others because it requires interprocess communication. If you specifies SyncManager
, only synchronization objects (Lock
and Event
) become proxy based, and internal main data are still placed on shared memory.
SharedMemory
was introduced at Python 3.8 (Ref). The backend can be serialized, so that it can work with pickle based multiprocessing like Ray. (See example)
On Linux, SharedMemory
are mapped under /dev/shm
directory by shm_open
. If the directory doesn’t have enough space, the program dies with Bus error
, which (as far as we know) cannot be handled by Python program correctly (like segmentation fault
). This is often the case inside docker container. You can increase the size by docker run -it --shm-size 1G python3.9 bash
etc.
On the other hand, sharedctypes
cannot be serialized. It can be passed to its subprocess only with process creation by fork
and spawn
(e.g. Process
).
The advantage of sharedctypes
is that it has fallback mechanism which can create shared memory even when /dev/shm
directory doesn’t have enough space.
from multiprocessing import Process, Event, SimpleQueue
import time
import gym
import numpy as np
from tqdm import tqdm
from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer
class MyModel:
def __init__(self):
self._weights = 0
def get_action(self,obs):
# Implement action selection
return 0
def abs_TD_error(self,sample):
# Implement absolute TD error
return np.zeros(sample["obs"].shape[0])
@property
def weights(self):
return self._weights
@weights.setter
def weights(self,w):
self._weights = w
def train(self,sample):
# Implement model update
pass
def explorer(global_rb,env_dict,is_training_done,queue):
local_buffer_size = int(1e+2)
local_rb = ReplayBuffer(local_buffer_size,env_dict)
model = MyModel()
env = gym.make("CartPole-v1")
obs = env.reset()
while not is_training_done.is_set():
if not queue.empty():
w = queue.get()
model.weights = w
action = model.get_action(obs)
next_obs, reward, done, _ = env.step(action)
local_rb.add(obs=obs,act=action,rew=reward,next_obs=next_obs,done=done)
if done:
local_rb.on_episode_end()
obs = env.reset()
else:
obs = next_obs
if local_rb.get_stored_size() == local_buffer_size:
local_sample = local_rb.get_all_transitions()
local_rb.clear()
absTD = model.abs_TD_error(local_sample)
global_rb.add(**local_sample,priorities=absTD)
def learner(global_rb,queues):
batch_size = 64
n_warmup = 100
n_training_step = int(1e+4)
explorer_update_freq = 100
model = MyModel()
while global_rb.get_stored_size() < n_warmup:
time.sleep(1)
for step in tqdm(range(n_training_step)):
sample = global_rb.sample(batch_size)
model.train(sample)
absTD = model.abs_TD_error(sample)
global_rb.update_priorities(sample["indexes"],absTD)
if step % explorer_update_freq == 0:
w = model.weights
for q in queues:
q.put(w)
if __name__ == "__main__":
buffer_size = int(1e+6)
env_dict = {"obs": {"shape": 4},
"act": {},
"rew": {},
"next_obs": {"shape": 4},
"done": {}}
n_explorer = 4
global_rb = MPPrioritizedReplayBuffer(buffer_size,env_dict)
is_training_done = Event()
is_training_done.clear()
qs = [SimpleQueue() for _ in range(n_explorer)]
ps = [Process(target=explorer,
args=[global_rb,env_dict,is_training_done,q])
for q in qs]
for p in ps:
p.start()
learner(global_rb,qs)
is_training_done.set()
for p in ps:
p.join()
print(global_rb.get_stored_size())
Updating segment tree for PER is critical section, too. To avoid data race, MPPrioritizedReplayBuffer
lazily updates segment tree from learner process just before sample
method. ↩︎